-
Notifications
You must be signed in to change notification settings - Fork 834
feat(cache): KEP-2655: Adding cache initializer #2793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cache): KEP-2655: Adding cache initializer #2793
Conversation
Pull Request Test Coverage Report for Build 18470076977Details
💛 - Coveralls |
f542e4a to
87003f6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akshaychitneni!
I left my initial comments.
|
/milestone v2.1 |
2392316 to
e20b1c1
Compare
5c77408 to
e3b9fad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akshaychitneni, I left a few comments.
cc @kubeflow/kubeflow-trainer-team @rudeigerc appreciate your review as well!
| @dataclass | ||
| class CacheDatasetInitializer: | ||
| storage_uri: str | ||
| train_job_name: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also is not needed, the TrainJob name is equal to JobSet name.
We can set the ENV variable in the ClusterTrainingRuntime: TRAIN_JOB_NAME which is getting the value from
metadata.labels['jobset.sigs.k8s.io/jobset-name']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I plan to set as env var on initializer container using downward api and here I am accessing that env to use trainjob name for ownerRef.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akshaychitneni If you get TrainJob name from the env, you don't need to set it in the CacheDatasetInitializer.
Similar to namespace:
https://github.com/kubeflow/trainer/pull/2793/files#diff-ed9e751df204997160579feb800b458887ed801b5caab572c0b5142b2e63129bR52
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't it similar to fetching from env variable? for namespace, I am not using env var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CacheDatasetInitializer is config that we expose to the end-user. Since TrainJob will be always set via env variable, you don't need to expose this parameter in the config that user can adjust.
Does it make sense @akshaychitneni ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not directly exposed to the users though. we create it from env vars -
trainer/pkg/initializers/utils/utils.py
Line 42 in b918411
| def get_config_from_env(config) -> Dict[str, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so you are going to have TRAIN_JOB_NAME env being set in your ClusterTrainingRuntime, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
pkg/initializers/dataset/cache.py
Outdated
| logging.info(f"Created LeaderWorkerSet {train_job_name}-cache") | ||
|
|
||
| # Create Service | ||
| service = client.V1Service( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LWS doesn't have any API to create service automatically ?
Maybe @kerthcet @kannon92 @ahg-g @ardaguclu knows about it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think lws controller doesn't create/manage service objects and it is upto clients to define.
dface27 to
efbebae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akshaychitneni, just left a few comments.
/cc @kubeflow/kubeflow-trainer-team @rudeigerc in case you want to left more comments.
| ], | ||
| ) | ||
| def test_default_values(test_name, config_values, expected_defaults): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this test case since you verify cluster creation here:
| def test_create_cache_cluster(test_name, test_case): |
| ], | |
| ) | |
| def test_default_values(test_name, config_values, expected_defaults): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I am validating config and in test_create_cache_cluster, I am looking for k8s api calls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akshaychitneni Can you just create another test case in the test_create_cache_cluster function?
| @@ -0,0 +1,356 @@ | |||
| from unittest.mock import MagicMock, patch | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akshaychitneni Are you going to add integration tests in the future PRs ?
| "HuggingFace - Invalid dataset", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I will work on adding integration tests in the future PRs
| config_dict = utils.get_config_from_env(types.CacheDatasetInitializer) | ||
| self.config = types.CacheDatasetInitializer(**config_dict) | ||
|
|
||
| def download_dataset(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akshaychitneni Did you review this ? Even that DatasetProvider interface doesn't have this API, we can still directly call cache.create_cache_cluster() API here:
| cache.download_dataset() |
efbebae to
e3a6544
Compare
| "worker_cpu": "8", | ||
| "worker_mem": "16Gi", | ||
| }, | ||
| "expected_substitutions": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you verify expected_substitutions ?
5fc611b to
27c69ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akshaychitneni!
/lgtm
/assign @astefanutti @Electronic-Waste @tenzen-y @rudeigerc
|
|
||
| # Get TrainJob for owner reference | ||
| try: | ||
| training_job = custom_api.get_namespaced_custom_object( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What gives the permissions to the TrainJob initializer to perform those requests to the API server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runtime should be configured with initializer having a serviceAccount with relevant permissions. We plan to document it.
| schema_name = self.schema_name | ||
|
|
||
| # Load Kubernetes configuration | ||
| config.load_incluster_config() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too deep into the design of this, so apologies for the out-of-context comment, but my first reaction is should all this be part of the control plane and not the runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, ideally we should move it to operator, we just didn't get chance to work on this.
@akshaychitneni Maybe as a workaround before building a cache controller, we can use trainer-controller-manager to create LWS with the appropriate spec (e.g. the cache plugin can be activated when storageURI sets as follows: cache://database/table)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was the initial plan to add as a plugin to trainer. As we intend to make leverage its own operator we haven't pursed that path. I think we can revisit this approach.
| annotations={ | ||
| "eks.amazonaws.com/sts-regional-endpoints": "true", | ||
| "eks.amazonaws.com/role-arn": iam_role, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should that be made configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our initial implementation only support s3 via iam. I think it is good make this configurable once we support additional providers
Signed-off-by: Akshay Chitneni <achitneni@apple.com>
27c69ec to
3516259
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we should be good to move this forward.
@akshaychitneni Please create tracking issues for the @astefanutti's suggestions, so we can track them: #2793 (comment)
/lgtm
/approve
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: andreyvelich The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What this PR does / why we need it:
Adds dataset initializer to bootstrap cache
Which issue(s) this PR fixes (optional, in
Fixes #<issue number>, #<issue number>, ...format, will close the issue(s) when PR gets merged):Fixes # #2792
Checklist: